當事件越來越多的時候,每次查詢aggregate時,若從事件溯源(Event Sourcing)第1個開始一個個往後滾,會使查詢變慢,但Query查詢是映射過僅供查詢用的資料,不寫入時使用的model應該是DDD完整概念的aggregate,所以不能去查我們實作的DTO,而解決這效能問題的方式之一就是snapshot(快照)
在事件Repository裡實作 snapshot的相關method:
#[async_trait]
impl PersistedEventRepository for SurrealEventRepository {
async fn get_snapshot<A: Aggregate>(&self, aggregate_id: &str)
-> Result<Option<SerializedSnapshot>, PersistenceError> {
let result = self.db
.select(("snapshots", aggregate_id))
.await?;
Ok(result)
}
async fn persist<A: Aggregate>(
&self,
events: &[SerializedEvent],
snapshot_update: Option<(String, Value, usize)>
) -> Result<(), PersistenceError> {
match snapshot_update {
None => {
for event in events {
let _: Vec<SerializedEvent> = self.db
.create("events")
.content(event)
.await?;
}
}
Some((aggregate_id, aggregate, current_snapshot)) => {
let mut current_sequence = 0;
for event in events {
current_sequence = event.sequence;
let _s: Vec<SerializedEvent> = self.db
.create("events")
.content(event)
.await?;
}
let snapshot = SerializedSnapshot{
aggregate_id: aggregate_id.clone(),
aggregate,
current_sequence,
current_snapshot,
};
if current_snapshot == 1 {
let _s: Option<SerializedSnapshot> = self.db
.create(("snapshots", aggregate_id))
.content(snapshot)
.await?;
} else {
let _s: Option<SerializedSnapshot> = self.db
.update(("snapshots", aggregate_id))
.content(snapshot)
.await?;
}
}
}
Ok(())
}
}
在儲存事件時,加判斷是否為snapshot,若否(None)則只存事件,若是則存入snapshot,再判斷當前snapshot版本若為1則新增,若為2則更新。
其中snapshot_update
的三個參數: